AWS上でApache Hudiを動かす
データアナリティクス事業本部の森脇です。
今回は、AWS上でApache Hudiを動かしてみたいと思います。
Apache Hudiとは?
Apache Software Foundation(以降、ASF)のプロジェクトであるOSSです。
Apache Hudi公式サイト(https://hudi.apache.org/)
データレイクを構築する際に利用可能で、ストリームデータ、ファイルデータをクラウドストレージ上で管理することが可能になります。
Spark上で動作し、以下のような機能が特徴です。
- 高速なUpsert
- ロールバック機能
- データリカバリ
元々はUberが開発していたプロジェクトですが、ASFに寄贈されたようです。
AWS上で動かす際には、いくつか選択肢があります。
Amazon EMRを使用する
Amazon EMRには、Apache Hudiがプリインストールされています。
そのため、EMRを使うだけで利用することが可能です。
AWS Glueを利用する
AWS GlueのGlueジョブを使ってApache Hudiを動かすことが可能です。
今回はこちらの方法を試してみました。
AWS GlueでApache Hudiを動かす
Apache Hudiを動かすために、AWS環境上にリソースを作成していきます。
IAMロール
「glue.amazonaws.com」と信頼関係を持つIAMロールを作成し、以下のポリシーを割り当てます。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:*", "logs:*"], "Resource": "*" } ] }
S3バケット
3つのバケットを予め作成しておきます。
- Glueスクリプト保存用
- Glueの一時ディレクトリ用
- Apache Hudi本体のjarライブラリ配置用
- Hudiのデータ管理用
作成後、「Apache Hudi本体のjarライブラリ配置用」バケットには以下のライブラリを配置しておきます。
Glueジョブ
マネジメントコンソールからGlueジョブを作成します。
以下設定のジョブを作成しました。
(特に記載が無い設定はデフォルトのままです)
設定名 | 値 |
---|---|
名前 | ApacheHudiSampleJob |
IAMロール | 前述の手順で作成したIAMロール名 |
Type | Spark |
Glue version | Spark 2.4, Python 3 with improved job startup times (Glue Version 2.0) |
このジョブ実行 | ユーザーが作成する新しいスクリプト |
スクリプトが保存されている S3 パス | 前述の手順で作成したS3バケット |
一時ディレクトリ | 前述の手順で作成したS3バケット |
依存 JARS パス | 前述の手順で配置した2つのライブラリパスをカンマ区切りで指定 |
スクリプトを記述
Quick Startのサンプルを利用します。
今回はデータのimportとクエリを試してみます。
Quich Startの内容と、Glueジョブ設定を組み合わせた以下のスクリプトを設定します。
import sys from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from pyspark.sql.session import SparkSession from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate() sc = spark.sparkContext glueContext = GlueContext(sc) job = Job(glueContext) job.init(args['JOB_NAME'], args) # クイックスタート用ライブラリを利用してデータ、及びデータフレームを作成 dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator() inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10)) df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.show() tableName = 'hudi_sample_table' # テーブル名 bucketName = 'cm-moriwaki-hudi-sample--datalake' # 前述の手順で作成したHudiデータ格納用S3バケット basePath = f's3://{bucketName}/{tableName}' # Hudiのオプション hudi_options = { 'hoodie.table.name': tableName, # テーブル名 # 書き込みオプション 'hoodie.datasource.write.recordkey.field': 'uuid', # レコードキーのカラム名 'hoodie.datasource.write.partitionpath.field': 'partitionpath', # パーティション対象のカラム名 'hoodie.datasource.write.table.name': tableName, # テーブル名 'hoodie.datasource.write.operation': 'insert', # 書き込み操作種別 'hoodie.datasource.write.precombine.field': 'ts', # レコードの重複制御用カラム名(同じ値のレコードが存在する場合、この値が大きい方が使用される) 'hoodie.upsert.shuffle.parallelism': 2, # upsert時の並列数(今回はinsert操作なので多分不要だがクイックスタートに記載があるのでそのまま利用) 'hoodie.insert.shuffle.parallelism': 2 # insert時の並列数 } # データの書き込み # 「mode」にoverwriteが指定されている場合、テーブルが作成する場合に再作成する df.write.format("hudi"). \ options(**hudi_options). \ mode("overwrite"). \ save(basePath) # 書き込んだ結果をSpark SQLでクエリ tripsSnapshotDF = spark. \ read. \ format("hudi"). \ load(basePath + "/*/*/*/*") tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show() job.commit()
Glueジョブの実行
ジョブを実行します。
正常終了した後にCloudwatch Logを確認すると、データのクエリが正しく行えていることを確認できます。
また、Hudiのデータ管理用S3バケットを確認すると、Hudiが生成したparquetファイル、メタデータファイルを確認することが可能です。
$ aws s3 ls --recursive s3://cm-moriwaki-hudi-sample--datalake 2020-11-30 06:50:42 0 hudi_sample_table/.hoodie/.aux/.bootstrap/.fileids_$folder$ 2020-11-30 06:50:42 0 hudi_sample_table/.hoodie/.aux/.bootstrap/.partitions_$folder$ 2020-11-30 06:50:42 0 hudi_sample_table/.hoodie/.aux/.bootstrap_$folder$ 2020-11-30 06:50:42 0 hudi_sample_table/.hoodie/.aux_$folder$ 2020-11-30 06:50:41 0 hudi_sample_table/.hoodie/.temp_$folder$ 2020-11-30 06:50:53 3788 hudi_sample_table/.hoodie/20201129215037.commit 2020-11-30 06:50:45 0 hudi_sample_table/.hoodie/20201129215037.commit.requested 2020-11-30 06:50:46 2192 hudi_sample_table/.hoodie/20201129215037.inflight 2020-11-30 06:50:41 0 hudi_sample_table/.hoodie/archived_$folder$ 2020-11-30 06:50:43 238 hudi_sample_table/.hoodie/hoodie.properties 2020-11-30 06:50:41 0 hudi_sample_table/.hoodie_$folder$ 2020-11-30 06:50:51 93 hudi_sample_table/americas/brazil/sao_paulo/.hoodie_partition_metadata 2020-11-30 06:50:52 437450 hudi_sample_table/americas/brazil/sao_paulo/23d41e97-46ae-4380-bc0e-eee2a282f490-0_0-8-28_20201129215037.parquet 2020-11-30 06:50:50 0 hudi_sample_table/americas/brazil/sao_paulo_$folder$ 2020-11-30 06:50:50 0 hudi_sample_table/americas/brazil_$folder$ 2020-11-30 06:50:51 93 hudi_sample_table/americas/united_states/san_francisco/.hoodie_partition_metadata 2020-11-30 06:50:52 437835 hudi_sample_table/americas/united_states/san_francisco/7e942885-59dd-47d2-b7e2-707d2a603594-0_1-8-29_20201129215037.parquet 2020-11-30 06:50:50 0 hudi_sample_table/americas/united_states/san_francisco_$folder$ 2020-11-30 06:50:50 0 hudi_sample_table/americas/united_states_$folder$ 2020-11-30 06:50:50 0 hudi_sample_table/americas_$folder$ 2020-11-30 06:50:51 93 hudi_sample_table/asia/india/chennai/.hoodie_partition_metadata 2020-11-30 06:50:52 437227 hudi_sample_table/asia/india/chennai/edb8efc4-b672-4d4c-9254-f3b276d53387-0_2-8-30_20201129215037.parquet 2020-11-30 06:50:50 0 hudi_sample_table/asia/india/chennai_$folder$ 2020-11-30 06:50:50 0 hudi_sample_table/asia/india_$folder$ 2020-11-30 06:50:50 0 hudi_sample_table/asia_$folder$ 2020-11-30 06:50:40 0 hudi_sample_table_$folder$
パーティションが切られていることもわかりますね。
まとめ
AWS Glueを使用してApache Hudiを動かしてみました。
今回は基本的な操作のみ試しましたが、次回はデータリカバリ、ロールバック機能などを試そうと思います。
参考
※Apache®、Apache Hudi、Hudi、Hudi、およびフレームロゴは、米国および、または他の国におけるApache Software Foundationの登録商標または商標です。これらのマークの使用は、Apache Software Foundationによる承認を意味するものではありません。